其他
掰开揉碎系列|详解Redis的Sorted-Set底层
The following article is from 学而思网校技术团队 Author 赵远通
背景:本文从一个有序单链表的数据结构开始,讲解有序单链表的插入、查询过程,通过插入、查询过程来引出有序单链表对于"高效"插入、查询的问题,然后通过对有序单链表的不断的改造,逐渐形成一个新的数据结构-跳跃表,并推导出跳跃表的、插入、查询时间复杂度,然后引出我们的主题<>的实现,通过对其数据结构、高度算法、插入、查询、删除的逐行源码分析让大家充分的了解其底层的实现,最后再通过跳表跟哈希表、平衡树、Btree之间的对比来说明为什么Redis会选择跳跃表这种数据结构作为Sorted-Set的实现。希望通过本文让大家真正掌握跳表这种数据结构,掌握Redis的Sorted-Set底层实现的原理。
01
单链表
typedef struct node {
int data;//代表数据域
struct LinkNode *next;//代表指针域,指向直接后继元素
}LinkNode;
LinkNode* InitLinkNode(){
//创建一个头结点
LinkNode *p = (LinkNode *)malloc(sizeof(LinkNode));
//声明一个指针指向头结点,用于遍历链表
LinkNode *temp = p;
//生成链表
for (int i = 1; i < 40; i += 10) {
LinkNode *node = (LinkNode *)malloc(sizeof(LinkNode));
node->data = i;
a->next = NULL;
temp->next = a;
temp = temp->next;
}
return p;
}
LinkNode* insert(LinkNode *p, int data){
LinkNode *temp = p;//创建临时结点temp
while(temp != NULL) {
if (temp->next == NULL || temp->next->data > data)
{
LinkNode *tempNode = (LinkNode *)malloc(sizeof(LinkNode));
tempNode->data = data;
tempNode->next = temp->next;
temp->next = tempNode;
break;
}
temp = temp->next;
}
return temp;
}
int Find(LinkNode *p,int data){
LinkNode *temp = p;
while (temp != NULL) {
if (temp->next != NULL && temp->next->data == data) {
return 1;
}
temp = temp->next;
}
return -1;
}
02
跳跃表
那么一级索引结点个数N / 2,那么假如我们的索引有M阶,第M阶索引就是:N / 2^M 假如第M阶的结点个数是 2,那么可以得出 N / 2^m = 2,得出时间复杂度为M = log(N) - 1 假如我们每次遍历 K个结点,那么得出时间复杂度是O(K * log(N)) K一般是个常数,所以最后的时间复杂度是O(log(N))
03
Redis跳跃表底层存储
typedef struct zskiplistNode {
sds ele;
double score;
struct zskiplistNode *backward;
struct zskiplistLevel {
struct zskiplistNode *forward;
unsigned long span;
} level[];
} zskiplistNode;
结构属性介绍:ele:存的是集合的值 score:存的一个double类型的排序字段,通过这个字段来进行集合值的排序 backward:链表指针,指向当前元素的前一个元素 Level[]:跳表的高度
forward:指向下一个元素 span:跨度,当前结点到下一个结点中间元素的个数
typedef struct zskiplist {
struct zskiplistNode *header, *tail;
unsigned long length;
int level;
} zskiplist;
结构属性介绍:header:指向跳表的头结点 tail:指向跳表的尾结点 length:跳表的高度 level:当前调整的高度
//跳表最高层级为64
#define ZSKIPLIST_MAXLEVEL 64 /* Should be enough for 2^64 elements */
//跳表的随机因子为 0.25
#define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
/* Returns a random level for the new skiplist node we are going to create.
* The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
* (both inclusive), with a powerlaw-alike distribution where higher
* levels are less likely to be returned. */
int zslRandomLevel(void) {
int level = 1;
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
根据代码我们先回答一下第一个问题:怎么来确定跳表的高度?回答:随机生成,对没有看错,就是随机生成,但是为了保证查询的性能,我们尽量让找出一套算法能够达到一个O(log(N))的时间复杂度,我们先看一下这个函数,random()会生成一个32位的随机数,跟 0xFFFF做与操作,其实就是把高16位清零,得到一个处于0x0000-0xFFFF的数字,然后每次循环判断生产的这个数字是否小于自己的1/4,如果成立则高度+1,循环结束在判断一下生产的level是否小于 64层,小于的话,就用生成的层数,否则用64层。我们看一下推导过程,我们定义随机因子为p:结点层数至少为1,而大于1的结点层数,满足一个概率分布。 结点层数恰好等于1的概率为1 - p 结点层数大于等于2的改为为p,而结点层数等于2的概率为p * (1 - p) 结点层数大于等于3的概率为p * p,而结点层数等于3的概率为p * p * (1-p) 结点层数大于等于4的概率为p * p * p,而结点层数等于4的概率为p * p * p * (1-p) 结点层数大于等于5的概率为p * p * p * p,而结点层数等于5的概率为p * p * p * p * (1-p) …
/* Create a new skiplist. */
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
//先申请一块内存
zsl = zmalloc(sizeof(*zsl));
//默认给跳表的高度赋值为1
zsl->level = 1;
//默认给跳表的长度赋值为0
zsl->length = 0;
//创建一个头结点,然后跳表指向头结点,可参考函数zslCreateNode
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
//这里是循环的给头结点的每个元素赋值
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;
//跳表尾部指向为NULL
zsl->tail = NULL;
return zsl;
}
创建一个skiplistNode结点//创建一个skiplistNode结点,有三个参数:level: 层级,score: 排序key,sds: 跳表值
zskiplistNode *zslCreateNode(int level, double score, sds ele) {
//同样先通过传过来的高度申请结点的内存
zskiplistNode *zn =
zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
//给跳表结点score赋值
zn->score = score;
//给跳表结点ele赋值
zn->ele = ele;
return zn;
}
我们通过图3.2来了解一下,头结点初始化后结构是什么样的。/* Insert a new node in the skiplist. Assumes the element does not already
* exist (up to the caller to enforce that). The skiplist takes ownership
* of the passed SDS string 'ele'. */
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
serverAssert(!isnan(score));
//赋值跳表的头结点给变量x
x = zsl->header;
//从最高层开始查找我们合适的位置
for (i = zsl->level-1; i >= 0; i--) {
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
/* we assume the element is not already inside, since we allow duplicated
* scores, reinserting the same element should never happen since the
* caller of zslInsert() should test in the hash table if the element is
* already inside or not. */
//生产随机的高度
level = zslRandomLevel();
//如果新节点的层数比表中其他节点的层数都要大
//那么初始化表头节点中未使用的层,并将它们记录到 update 数组中
//将来也指向新节点
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
//创建一个新的结点
x = zslCreateNode(level,score,ele);
//插入新结点的过程
for (i = 0; i < level; i++) {
//设置新节点的 forward 指针
x->level[i].forward = update[i]->level[i].forward;
//将旧结点的各个节点的 forward 指针指向新节点
update[i]->level[i].forward = x;
//计算新结点的跨度
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
//更新结点插入之后,旧结点的跨度
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
/* increment span for untouched levels */
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
//调整新结点的backward
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
//更新跳表的长度
zsl->length++;
return x;
}
通过对整体函数的解析,我们发现插入一个元素基本上分为4个步骤:查到插入的位置(查询过程); 生产新结点高度并调整跳表的高度; 插入元素; 调整backward并更新跳表的长度; 下面我们统一逐步分析每一次循环的代码来了解一下整个插入过程。
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
serverAssert(!isnan(score));
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert position */
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
查找节点(score=21,level=2)的插入位置,逻辑如下:第一次for循环,i=1。x现在为跳跃表的头节点; 现在i的值与zsl->level-1相等,所以rank[1]的值为0; header->level[1].forward存在,并且header->level[1].forward->score(1)小于要插入值的score,所以while循环可以进入,rank[1]=1,x赋值为第一个节点; 第一个节点的第1层的forward指向NULL,所以while循环不会再进入,经过第一次for循环,rank[1]=1,**x和update[1]**都为第一个节点(score=1)。
for循环进入第二次,i=0。x为跳跃表第一个节点(score=1) 现在i的值与zsl->level-1不相等,所以rank[0]等于rank[1]的值赋值为1; x->level[0]->forward存在,并且**x->level[0].foreard->score(11)小于要插入的score,所以while循环可以进入,rank[0]=2,x为第二个节点(score=11)。 x->level[0]->forward存在,并且x->level[0].foreard->score(23)**大于要插入的score,所以while不会再进入,经过第二次for循环,rank[0]=2,x和update[0]都为第二个节点(score=11),如图3.5所示:
level = zslRandomLevel();
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
假如我们生成的高度是3,这个时候我们需要调整一下整个跳表的高度,此时i的值是2,level的值是3,所以我们只能进入一次循环,此时rank[2] = 0,update[2]指向头结点,update[2]->level[2].span = 3,zsl->level = 3,调整完高度,如图3.6所示:x = zslCreateNode(level,score,ele);
for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
/* update span covered by update[i] as x is inserted here */
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
level的值为3,所以for循环可以执行三遍,插入过程如下:for循环第一遍:x的level[0]的forward为update[0]的level[0]的forward节点,即x->level[0].forward为score=23的节点; **update[0]的level[0]**的下个节点为新插入的节点; rank[0]的值为0,update[0]->level[0].span=1,所以x->level[0].span=1、update[0]->level[0].span=1。
x的level[1]的forward为update[1]的level[1]的forward节点,即x->level[1].forward为NULL; **update[1]的level[1]**的下个节点为新插入的节点; rank[1]的值为1,update[1]->level[1].span等于2,所以x->level[1].span=1 update[1]->level[1].span=2。
x的level[2]的forward为update[2]的level[2]的forward节点,即x->level[2].forward为NULL; update[2]的level[2]的下个节点为新插入的节点; rank[2]的值为2,因为update[2]->level[2].span等于跳跃表的总长度3,所以x->level[2].span=1 update[2]->level[2].span=3。
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
x->backward的值等于,判断update[0] == zsl->header是否相等,如果想当代表是个空跳表,则赋值为NULL,如果不是的话,就把update[0]的赋给它,也就是说指向score = 11的backward。第二步判断是否是尾结点,如果不是则调整对应的backward,如果是就更新跳表的尾部指向。最后再更新跳表的length = 4,调整完的结构如图3.10所示:unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) {
zskiplistNode *x;
unsigned long rank = 0;
int i;
//先把头结点保存起来
x = zsl->header;
//从跳表的最高层开始循环查找
for (i = zsl->level-1; i >= 0; i--) {
//先从最高层开始查找
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) <= 0))) {
rank += x->level[i].span;
x = x->level[i].forward;
}
/* x might be equal to zsl->header, so test if obj is non-NULL */
//如果查找到相同的元素,则直接返回,rank
if (x->ele && sdscmp(x->ele,ele) == 0) {
return rank;
}
}
//否则返回0
return 0;
}
我们可以通过源代码来看,其实查找一个元素跟我们插入过程中查找插入位置的代码基本一致,具体查询过程此小节就不详细展开了,(可以查看插入过程查找插入位置的详细讲解)。3.6 Sorted-Set查询多个元素源码分析3.6.1 整体源码分析查询多个元素我们通过zrange和zrevrange的命令的实现来进行源码分析:void zrangeGenericCommand(client *c, int reverse) {
robj *key = c->argv[1];
robj *zobj;
int withscores = 0;
long start;
long end;
long llen;
long rangelen;
//中间省略一下无用的代码
.....
//获取跳表当前的长度,请参数小节3.9.1
llen = zsetLength(zobj);
//初始化一下开始游标和结束游标
if (start < 0) start = llen+start;
if (end < 0) end = llen+end;
if (start < 0) start = 0;
/* Invariant: start >= 0, so this test will be true when end < 0.
* The range is empty when start > end or start >= length. */
//异常判断
if (start > end || start >= llen) {
addReply(c,shared.emptymultibulk);
return;
}
//得到获取元素的个数
if (end >= llen) end = llen-1;
rangelen = (end-start)+1;
/* Return the result in form of a multi-bulk reply */
addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen);
//如果是压缩列表,这次我们不关心压测列表,可以先跳过这段代码
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *zl = zobj->ptr;
unsigned char *eptr, *sptr;
unsigned char *vstr;
unsigned int vlen;
long long vlong;
if (reverse)
eptr = ziplistIndex(zl,-2-(2*start));
else
eptr = ziplistIndex(zl,2*start);
serverAssertWithInfo(c,zobj,eptr != NULL);
sptr = ziplistNext(zl,eptr);
while (rangelen--) {
serverAssertWithInfo(c,zobj,eptr != NULL && sptr != NULL);
serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
if (vstr == NULL)
addReplyBulkLongLong(c,vlong);
else
addReplyBulkCBuffer(c,vstr,vlen);
if (withscores)
addReplyDouble(c,zzlGetScore(sptr));
if (reverse)
zzlPrev(zl,&eptr,&sptr);
else
zzlNext(zl,&eptr,&sptr);
}
//如果是跳表
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplist *zsl = zs->zsl;
zskiplistNode *ln;
sds ele;
/* Check if starting point is trivial, before doing log(N) lookup. */
//如果逆序,则从后往前取
if (reverse) {
ln = zsl->tail;
//如果start不是从0开始,则先要找到start的那个起点的结点
if (start > 0)
//通过rank获取该结点,请参考3.9.2
ln = zslGetElementByRank(zsl,llen-start);
} else {
//从第一个结点开始
ln = zsl->header->level[0].forward;
if (start > 0)
ln = zslGetElementByRank(zsl,start+1);
}
//循环这次去除取数据的个数,直到元素取完
while(rangelen--) {
//断言
serverAssertWithInfo(c,zobj,ln != NULL);
ele = ln->ele;
//把数据写到对client的缓冲区中,批量返回请参考3.9.3
addReplyBulkCBuffer(c,ele,sdslen(ele));
//假如带了withscores参数
if (withscores)
//把结点的score(double类型)也同时写到缓冲区,请参考3.9.4
addReplyDouble(c,ln->score);
//下一个结点,如果从头那,则用forward找到下一个结点,如果是从尾部拿,则用backward找打前一个结点
ln = reverse ? ln->backward : ln->level[0].forward;
}
} else {
serverPanic("Unknown sorted set encoding");
}
}
这个过程就非常简单了,我们通过两张图来分别描述一下从头取元素,从尾取元素的过程。3.6.2 Zrange查询过程命令:zrange mytest 0 -1**,从头部取元素的过程,1 -> 11 -> 21 -> 23 结束。如图3.12:int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;
x = zsl->header;
//同样先从最高层找到要删除元素
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
x = x->level[i].forward;
}
update[i] = x;
}
/* We may have multiple elements with the same score, what we need
* is to find the element with both the right score and object. */
x = x->level[0].forward;
//设置span和forward
if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
zslDeleteNode(zsl, x, update);
if (!node)
//请参数3.9.5
zslFreeNode(x);
else
*node = x;
return 1;
}
return 0; /* not found */
}
删除结点的过程我们通过源代码能够得知分为两个步骤:查找要删除的结点; 设置span和forward;
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;
x = zsl->header;
//同样先从最高层找到要删除元素
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
x = x->level[i].forward;
}
update[i] = x;
}
中间每次循环的查询过程省略了(如果想了解请查看3.4小节或者3.6小节),通过循环赋值,最后得出update[2] 指向header、update[1] 指向score = 1的结点、update[0]指向score = 11的结点,如图3.14所示:/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
//更新每个update结点的span和forward
for (i = 0; i < zsl->level; i++) {
if (update[i]->level[i].forward == x) {
update[i]->level[i].span += x->level[i].span - 1;
update[i]->level[i].forward = x->level[i].forward;
} else {
update[i]->level[i].span -= 1;
}
}
//调整backward
if (x->level[0].forward) {
x->level[0].forward->backward = x->backward;
} else {
zsl->tail = x->backward;
}
while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
zsl->level--;
zsl->length--;
}
我们看一下更新每个update结点后的span和forward之后,跳的结构是什么样的?如图3.1.5所示:哈希表无序,我们的想实现一个有序的数据结构,这个不满足需求 没办法范围查询
实现难易,跳表实现比较简单,平衡树实现很复杂 范围查询方便,跳表只需要通过找到范围开始结点,然后顺着前后把元素找齐就行了,平衡树如果范围查询,则很困难,我们先要找到指定范围的小值之后,再通过中序遍历继续寻找其他不超过大值的结点。
unsigned long zsetLength(const robj *zobj) {
unsigned long length = 0;
//如果是压缩列表
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
length = zzlLength(zobj->ptr);
//如果是跳表
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
//直接返回结构跳表的长度
length = ((const zset*)zobj->ptr)->zsl->length;
} else {
serverPanic("Unknown sorted set encoding");
}
return length;
}
3.9.2 按照Rank查到该结点/* Finds an element by its rank. The rank argument needs to be 1-based. */
zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
zskiplistNode *x;
unsigned long traversed = 0;
int i;
x = zsl->header;
//也是同样的从最高层开始寻找
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
{
traversed += x->level[i].span;
x = x->level[i].forward;
}
//如果找到rank相等的,返回该结点
if (traversed == rank) {
return x;
}
}
return NULL;
}
3.9.3 批量增加写数据到缓冲区/* Add a C buffer as bulk reply */
void addReplyBulkCBuffer(client *c, const void *p, size_t len) {
addReplyLongLongWithPrefix(c,len,'$');
addReplyString(c,p,len);
addReply(c,shared.crlf);
}
3.9.4 写double类型的数据到缓冲区/* Add a double as a bulk reply */
void addReplyDouble(client *c, double d) {
char dbuf[128], sbuf[128];
int dlen, slen;
if (isinf(d)) {
/* Libc in odd systems (Hi Solaris!) will format infinite in a
* different way, so better to handle it in an explicit way. */
addReplyBulkCString(c, d > 0 ? "inf" : "-inf");
} else {
dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d);
slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf);
addReplyString(c,sbuf,slen);
}
}
3.9.5 释放跳表结点void zslFreeNode(zskiplistNode *node) {
sdsfree(node->ele);
zfree(node);
}
04
结语、参考资料
https://github.com/redis/redis/blob/unstable/src/server.h https://github.com/redis/redis/blob/unstable/src/server.c https://github.com/redis/redis/blob/unstable/src/t_set.c https://github.com/redis/redis/blob/unstable/src/ziplist.c https://github.com/redis/redis/blob/unstable/src/ziplist.h
我知道你“在看”哟~